from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.models import Variable
from spmi_analysis.dm.dm_operation_bill_fee_daily_summary import spmi_dm__dm_operation_bill_fee_daily_summary
from spmi_analysis.tidb.tidb_dm_operation_bill_fee_month_summary import tidb_dm__dm_operation_bill_fee_month_summary

tidb_host = Variable.get('bigdata_tidb_host')
tidb_port = Variable.get('bigdata_tidb_port')
tidb_url = Variable.get('bigdata_tidb_url')
tidb_user = Variable.get('bigdata_tidb_user')
tidb_password = Variable.get('bigdata_tidb_password')

# ############hive to tidb的信息############
# hive表默认分区表 分区为dt  如不是请跟进自己表进行调整sqoop命令
hive_table='spmi_tmp.dm_operation_bill_fee_daily_summary_push'
# tidb信息
tidb_table='spmi_dm.yl_jms_spmi_operation_bill_sum_pt'
# tidb是否是分区表 true ：tidb是分区表  false：tidb不是分区表
if_partitions='true'
# 每次推送天数
interval_dt=60
# 推送map个数（最大100map 一般20-50即可在15分钟内推送亿级别数据）
sqoop_maps=60


tidb_dm__dm_operation_bill_fee_daily_summary = BashOperator(
    task_id='spmi_tidb__dm_operation_bill_fee_daily_summary',
    execution_timeout=timedelta(hours=2),
    email=['matthew.xiong@jtexpress.com', 'yl_bigdata@yl-scm.com'],
    bash_command="spmi_analysis/tidb/tidb_dm_operation_bill_fee_daily_summary/execute.sh",
    pool='spmi_apack',
    retries=1,
    pool_slots=2,
    params={'tidb_host': tidb_host, 'tidb_port': tidb_port, 'tidb_url': tidb_url, 'tidb_user': tidb_user,
            'tidb_password': tidb_password, 'hive_table': hive_table, 'tidb_table': tidb_table,
            'if_partitions': if_partitions, 'interval_dt': interval_dt, 'sqoop_maps': sqoop_maps}
)

tidb_dm__dm_operation_bill_fee_daily_summary << [spmi_dm__dm_operation_bill_fee_daily_summary, tidb_dm__dm_operation_bill_fee_month_summary]